#!/usr/bin/env python3
"""Provides the 1.0, 1.1 and 1.2 protocol classes."""

import uuid

from ambari_stomp.backward import encode
from ambari_stomp.constants import *
from ambari_stomp.exception import ConnectFailedException
from ambari_stomp.listener import *
import ambari_stomp.utils as utils


log = logging.getLogger("stomp.py")


class Protocol10(ConnectionListener):
  """
  Represents version 1.0 of the protocol (see https://stomp.github.io/stomp-specification-1.0.html).

  Most users should not instantiate the protocol directly. See :py:mod:`stomp.connect` for connection classes.

  :param transport:
  :param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set
  """

  def __init__(self, transport, auto_content_length=True):
    self.transport = transport
    self.auto_content_length = auto_content_length
    transport.set_listener("protocol-listener", self)
    self.version = "1.0"

  def send_frame(self, cmd, headers=None, body=""):
    """
    Encode and send a stomp frame
    through the underlying transport.

    :param str cmd: the protocol command
    :param dict headers: a map of headers to include in the frame
    :param body: the content of the message
    """
    frame = utils.Frame(cmd, headers, body)
    self.transport.transmit(frame)

  def abort(self, transaction, headers=None, **keyword_headers):
    """
    Abort a transaction.

    :param str transaction: the identifier of the transaction
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    assert transaction is not None, "'transaction' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_ABORT, headers)

  def ack(self, id, transaction=None):
    """
    Acknowledge 'consumption' of a message by id.

    :param str id: identifier of the message
    :param str transaction: include the acknowledgement in the specified transaction
    """
    assert id is not None, "'id' is required"
    headers = {HDR_MESSAGE_ID: id}
    if transaction:
      headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_ACK, headers)

  def begin(self, transaction=None, headers=None, **keyword_headers):
    """
    Begin a transaction.

    :param str transaction: the identifier for the transaction (optional - if not specified
        a unique transaction id will be generated)
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires

    :return: the transaction id
    :rtype: str
    """
    headers = utils.merge_headers([headers, keyword_headers])
    if not transaction:
      transaction = str(uuid.uuid4())
    headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_BEGIN, headers)
    return transaction

  def commit(self, transaction=None, headers=None, **keyword_headers):
    """
    Commit a transaction.

    :param str transaction: the identifier for the transaction
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    assert transaction is not None, "'transaction' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_COMMIT, headers)

  def connect(
    self, username=None, passcode=None, wait=False, headers=None, **keyword_headers
  ):
    """
    Start a connection.

    :param str username: the username to connect with
    :param str passcode: the password used to authenticate with
    :param bool wait: if True, wait for the connection to be established/acknowledged
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    cmd = CMD_CONNECT
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_ACCEPT_VERSION] = self.version

    if username is not None:
      headers[HDR_LOGIN] = username

    if passcode is not None:
      headers[HDR_PASSCODE] = passcode

    self.send_frame(cmd, headers)

    if wait:
      self.transport.wait_for_connection()
      if self.transport.connection_error:
        raise ConnectFailedException()

  def disconnect(self, receipt=None, headers=None, **keyword_headers):
    """
    Disconnect from the server.

    :param str receipt: the receipt to use (once the server acknowledges that receipt, we're
        officially disconnected; optional - if not specified a unique receipt id will
        be generated)
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    if not self.transport.is_connected():
      log.debug("Not sending disconnect, already disconnected")
      return
    headers = utils.merge_headers([headers, keyword_headers])
    rec = receipt or str(uuid.uuid4())
    headers[HDR_RECEIPT] = rec
    self.set_receipt(rec, CMD_DISCONNECT)
    self.send_frame(CMD_DISCONNECT, headers)

  def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
    """
    Send a message to a destination.

    :param str destination: the destination of the message (e.g. queue or topic name)
    :param body: the content of the message
    :param str content_type: the content type of the message
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    assert destination is not None, "'destination' is required"
    assert body is not None, "'body' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_DESTINATION] = destination
    if content_type:
      headers[HDR_CONTENT_TYPE] = content_type
    body = encode(body)
    if self.auto_content_length and body and HDR_CONTENT_LENGTH not in headers:
      headers[HDR_CONTENT_LENGTH] = len(body)
    self.send_frame(CMD_SEND, headers, body)

  def subscribe(
    self, destination, id=None, ack="auto", headers=None, **keyword_headers
  ):
    """
    Subscribe to a destination.

    :param str destination: the topic or queue to subscribe to
    :param str id: a unique id to represent the subscription
    :param str ack: acknowledgement mode, either auto, client, or client-individual
        (see http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_ack_Header)
        for more information
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    assert destination is not None, "'destination' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_DESTINATION] = destination
    if id:
      headers[HDR_ID] = id
    headers[HDR_ACK] = ack
    self.send_frame(CMD_SUBSCRIBE, headers)

  def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers):
    """
    Unsubscribe from a destination by either id or the destination name.

    :param str destination: the name of the topic or queue to unsubscribe from
    :param str id: the unique identifier of the topic or queue to unsubscribe from
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    assert (
      id is not None or destination is not None
    ), "'id' or 'destination' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    if id:
      headers[HDR_ID] = id
    if destination:
      headers[HDR_DESTINATION] = destination
    self.send_frame(CMD_UNSUBSCRIBE, headers)


class Protocol11(HeartbeatListener, ConnectionListener):
  """
  Represents version 1.1 of the protocol (see https://stomp.github.io/stomp-specification-1.1.html).

  Most users should not instantiate the protocol directly. See :py:mod:`stomp.connect` for connection classes.

  :param transport:
  :param (int,int) heartbeats:
  :param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set
  """

  def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True):
    HeartbeatListener.__init__(self, heartbeats)
    self.transport = transport
    self.auto_content_length = auto_content_length
    transport.set_listener("protocol-listener", self)
    self.version = "1.1"

  def _escape_headers(self, headers):
    """
    :param dict(str,str) headers:
    """
    for key, val in headers.items():
      try:
        val = val.replace("\\", "\\\\").replace("\n", "\\n").replace(":", "\\c")
      except:
        pass
      headers[key] = val

  def send_frame(self, cmd, headers=None, body=""):
    """
    Encode and send a stomp frame
    through the underlying transport:

    :param str cmd: the protocol command
    :param dict headers: a map of headers to include in the frame
    :param body: the content of the message
    """
    if cmd != CMD_CONNECT:
      if headers is None:
        headers = {}
      self._escape_headers(headers)
    frame = utils.Frame(cmd, headers, body)
    self.transport.transmit(frame)

  def abort(self, transaction, headers=None, **keyword_headers):
    """
    Abort a transaction.

    :param str transaction: the identifier of the transaction
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    assert transaction is not None, "'transaction' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_ABORT, headers)

  def ack(self, id, subscription, transaction=None):
    """
    Acknowledge 'consumption' of a message by id.

    :param str id: identifier of the message
    :param str subscription: the subscription this message is associated with
    :param str transaction: include the acknowledgement in the specified transaction
    """
    assert id is not None, "'id' is required"
    assert subscription is not None, "'subscription' is required"
    headers = {HDR_MESSAGE_ID: id, HDR_SUBSCRIPTION: subscription}
    if transaction:
      headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_ACK, headers)

  def begin(self, transaction=None, headers=None, **keyword_headers):
    """
    Begin a transaction.

    :param str transaction: the identifier for the transaction (optional - if not specified
        a unique transaction id will be generated)
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires

    :return: the transaction id
    :rtype: str
    """
    headers = utils.merge_headers([headers, keyword_headers])
    if not transaction:
      transaction = str(uuid.uuid4())
    headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_BEGIN, headers)
    return transaction

  def commit(self, transaction=None, headers=None, **keyword_headers):
    """
    Commit a transaction.

    :param str transaction: the identifier for the transaction
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    assert transaction is not None, "'transaction' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_COMMIT, headers)

  def connect(
    self, username=None, passcode=None, wait=False, headers=None, **keyword_headers
  ):
    """
    Start a connection.

    :param str username: the username to connect with
    :param str passcode: the password used to authenticate with
    :param bool wait: if True, wait for the connection to be established/acknowledged
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    cmd = CMD_STOMP
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_ACCEPT_VERSION] = self.version

    if self.transport.vhost:
      headers[HDR_HOST] = self.transport.vhost

    if username is not None:
      headers[HDR_LOGIN] = username

    if passcode is not None:
      headers[HDR_PASSCODE] = passcode

    self.send_frame(cmd, headers)

    if wait:
      self.transport.wait_for_connection()
      if self.transport.connection_error:
        raise ConnectFailedException()

  def disconnect(self, receipt=None, headers=None, **keyword_headers):
    """
    Disconnect from the server.

    :param str receipt: the receipt to use (once the server acknowledges that receipt, we're
        officially disconnected; optional - if not specified a unique receipt id will
        be generated)
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    if not self.transport.is_connected():
      log.debug("Not sending disconnect, already disconnected")
      return
    headers = utils.merge_headers([headers, keyword_headers])
    rec = receipt or str(uuid.uuid4())
    headers[HDR_RECEIPT] = rec
    self.set_receipt(rec, CMD_DISCONNECT)
    self.send_frame(CMD_DISCONNECT, headers)

  def nack(self, id, subscription, transaction=None):
    """
    Let the server know that a message was not consumed.

    :param str id: the unique id of the message to nack
    :param str subscription: the subscription this message is associated with
    :param str transaction: include this nack in a named transaction
    """
    assert id is not None, "'id' is required"
    assert subscription is not None, "'subscription' is required"
    headers = {HDR_MESSAGE_ID: id, HDR_SUBSCRIPTION: subscription}
    if transaction:
      headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_NACK, headers)

  def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
    """
    Send a message to a destination in the messaging system (as per https://stomp.github.io/stomp-specification-1.2.html#SEND)

    :param str destination: the destination (such as a message queue - for example '/queue/test' - or a message topic)
    :param body: the content of the message
    :param str content_type: the MIME type of message
    :param dict headers: additional headers to send in the message frame
    :param keyword_headers: any additional headers the broker requires
    """
    assert destination is not None, "'destination' is required"
    assert body is not None, "'body' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_DESTINATION] = destination
    if content_type:
      headers[HDR_CONTENT_TYPE] = content_type
    body = encode(body)
    if self.auto_content_length and body and HDR_CONTENT_LENGTH not in headers:
      headers[HDR_CONTENT_LENGTH] = len(body)
    self.send_frame(CMD_SEND, headers, body)

  def subscribe(self, destination, id, ack="auto", headers=None, **keyword_headers):
    """
    Subscribe to a destination

    :param str destination: the topic or queue to subscribe to
    :param str id: the identifier to uniquely identify the subscription
    :param str ack: either auto, client or client-individual (see https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE for more info)
    :param dict headers: a map of any additional headers to send with the subscription
    :param keyword_headers: any additional headers to send with the subscription
    """
    assert destination is not None, "'destination' is required"
    assert id is not None, "'id' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_DESTINATION] = destination
    headers[HDR_ID] = id
    headers[HDR_ACK] = ack
    self.send_frame(CMD_SUBSCRIBE, headers)

  def unsubscribe(self, id, headers=None, **keyword_headers):
    """
    Unsubscribe from a destination by its unique identifier

    :param str id: the unique identifier to unsubscribe from
    :param dict headers: additional headers to send with the unsubscribe
    :param keyword_headers: any additional headers to send with the subscription
    """
    assert id is not None, "'id' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_ID] = id
    self.send_frame(CMD_UNSUBSCRIBE, headers)


class Protocol12(Protocol11):
  """
  Represents version 1.2 of the protocol (see https://stomp.github.io/stomp-specification-1.2.html).

  Most users should not instantiate the protocol directly. See :py:mod:`stomp.connect` for connection classes.

  :param transport:
  :param (int,int) heartbeats:
  :param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set
  """

  def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True):
    Protocol11.__init__(self, transport, heartbeats, auto_content_length)
    self.version = "1.2"

  def _escape_headers(self, headers):
    """
    :param dict(str,str) headers:
    """
    for key, val in headers.items():
      try:
        val = (
          val.replace("\\", "\\\\")
          .replace("\n", "\\n")
          .replace(":", "\\c")
          .replace("\r", "\\r")
        )
      except:
        pass
      headers[key] = val

  def ack(self, id, transaction=None):
    """
    Acknowledge 'consumption' of a message by id.

    :param str id: identifier of the message
    :param str transaction: include the acknowledgement in the specified transaction
    """
    assert id is not None, "'id' is required"
    headers = {HDR_ID: id}
    if transaction:
      headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_ACK, headers)

  def nack(self, id, transaction=None):
    """
    Let the server know that a message was not consumed.

    :param str id: the unique id of the message to nack
    :param str transaction: include this nack in a named transaction
    """
    assert id is not None, "'id' is required"
    headers = {HDR_ID: id}
    if transaction:
      headers[HDR_TRANSACTION] = transaction
    self.send_frame(CMD_NACK, headers)

  def connect(
    self, username=None, passcode=None, wait=False, headers=None, **keyword_headers
  ):
    """
    Send a STOMP CONNECT frame. Differs from 1.0 and 1.1 versions in that the HOST header is enforced.

    :param str username: optionally specify the login user
    :param str passcode: optionally specify the user password
    :param bool wait: wait for the connection to complete before returning
    :param dict headers: a map of any additional headers to send with the subscription
    :param keyword_headers: any additional headers to send with the subscription
    """
    cmd = CMD_STOMP
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_ACCEPT_VERSION] = self.version
    headers[HDR_HOST] = self.transport.current_host_and_port[0]

    if self.transport.vhost:
      headers[HDR_HOST] = self.transport.vhost

    if username is not None:
      headers[HDR_LOGIN] = username

    if passcode is not None:
      headers[HDR_PASSCODE] = passcode

    self.send_frame(cmd, headers)

    if wait:
      self.transport.wait_for_connection()
      if self.transport.connection_error:
        raise ConnectFailedException()
